这可能是最好的RxJava 2.x 教程(完结版)
前言
首先欢迎关注 nanchen 的 Android 小窝,希望以后我的推送能带给你开心,这篇文章是我公众号推送的第一篇文章, RxJava 系列,欢迎拍砖。
为什么要学 RxJava?
提升开发效率,降低维护成本一直是开发团队永恒不变的宗旨。近两年来国内的技术圈子中越来越多的开始提及 RxJava ,越来越多的应用和面试中都会有 RxJava ,而就目前的情况,Android 的网络库基本被 Retrofit + OkHttp 一统天下了,而配合上响应式编程 RxJava 可谓如鱼得水。想必大家肯定被近期的 Kotlin 炸开了锅,笔者也在闲暇之时去了解了一番(作为一个与时俱进的有理想的青年怎么可能不与时俱进?),发现其中有个非常好的优点就是简洁,支持函数式编程。是的, RxJava 最大的优点也是简洁,但它不止是简洁,而且是 随着程序逻辑变得越来越复杂,它依然能够保持简洁 (这货洁身自好呀有木有)。
咳咳,要例子,猛戳这里:给 Android 开发者的 RxJava 详解
开始
RxJava 2.x 已经按照 Reactive-Streams specification 规范完全的重写了,maven也被放在了io.reactivex.rxjava2:rxjava:2.x.y
下,所以 RxJava 2.x 独立于 RxJava 1.x 而存在,而随后官方宣布的将在一段时间后终止对 RxJava 1.x 的维护,所以对于熟悉 RxJava 1.x 的老司机自然可以直接看一下 2.x 的文档和异同就能轻松上手了,而对于不熟悉的年轻司机,不要慌,本酱带你装逼带你飞,马上就发车,坐稳了:https://github.com/nanchen2251/RxJava2Examples
你只需要在 build.gradle 中加上:compile 'io.reactivex.rxjava2:rxjava:2.1.1'
(2.1.1为写此文章时的最新版本)
接口变化
RxJava 2.x 拥有了新的特性,其依赖于4个基础接口,它们分别是
Publisher
Subscriber
Subscription
Processor
其中最核心的莫过于 Publisher
和 Subscriber
。Publisher
可以发出一系列的事件,而 Subscriber
负责和处理这些事件。
其中用的比较多的自然是 Publisher
的 Flowable
,它支持背压。关于背压给个简洁的定义就是:
背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。
简而言之,背压是流速控制的一种策略。有兴趣的可以看一下官方对于背压的讲解。
可以明显地发现,RxJava 2.x 最大的改动就是对于 backpressure
的处理,为此将原来的 Observable
拆分成了新的 Observable
和 Flowable
,同时其他相关部分也同时进行了拆分,但令人庆幸的是,是它,是它,还是它,还是我们最熟悉和最喜欢的 RxJava。
观察者模式
大家可能都知道, RxJava 以观察者模式为骨架,在 2.0 中依旧如此。
不过此次更新中,出现了两种观察者模式:
Observable ( 被观察者 ) / Observer ( 观察者 )
Flowable (被观察者)/ Subscriber (观察者)
在 RxJava 2.x 中,Observable 用于订阅 Observer,不再支持背压(1.x 中可以使用背压策略),而 Flowable 用于订阅 Subscriber , 是支持背压(Backpressure)的。
Observable
在 RxJava 1.x 中,我们最熟悉的莫过于 Observable
这个类了,笔者在刚刚使用 RxJava 2.x 的时候,创建了 一个 Observable
,瞬间一脸懵逼有木有,居然连我们最最熟悉的 Subscriber
都没了,取而代之的是 ObservableEmmiter
,俗称发射器。此外,由于没有了Subscriber
的踪影,我们创建观察者时需使用 Observer
。而 Observer
也不是我们熟悉的那个 Observer
,又出现了一个 Disposable
参数带你装逼带你飞。
废话不多说,从会用开始,还记得 RxJava 的三部曲吗?
第一步:初始化 Observable
第二步:初始化 Observer
第三步:建立订阅关系
Observable.create(new ObservableOnSubscribe<Integer>() { // 第一步:初始化Observable
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Log.e(TAG, "Observable emit 1" + "\n");
e.onNext(1);
Log.e(TAG, "Observable emit 2" + "\n");
e.onNext(2);
Log.e(TAG, "Observable emit 3" + "\n");
e.onNext(3);
e.onComplete();
Log.e(TAG, "Observable emit 4" + "\n" );
e.onNext(4);
}
}).subscribe(new Observer<Integer>() { // 第三步:订阅
// 第二步:初始化Observer
private int i;
private Disposable mDisposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
mDisposable = d;
}
@Override
public void onNext(@NonNull Integer integer) {
i++;
if (i == 2) {
// 在RxJava 2.x 中,新增的Disposable可以做到切断的操作,让Observer观察者不再接收上游事件
mDisposable.dispose();
}
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "onError : value : " + e.getMessage() + "\n" );
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete" + "\n" );
}
});
不难看出,RxJava 2.x 与 1.x 还是存在着一些区别的。首先,创建 Observable
时,回调的是 ObservableEmitter
,字面意思即发射器,并且直接 throws Exception。其次,在创建的 Observer 中,也多了一个回调方法:onSubscribe
,传递参数为Disposable
,Disposable
相当于 RxJava 1.x 中的 Subscription
, 用于解除订阅。可以看到示例代码中,在 i 自增到 2 的时候,订阅关系被切断。
07-03 14:24:11.663 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onSubscribe : false07-03 14:24:11.664 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 107-03 14:24:11.665 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : value : 107-03 14:24:11.666 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 207-03 14:24:11.667 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : value : 207-03 14:24:11.668 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : isDisposable : true07-03 14:24:11.669 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 307-03 14:24:11.670 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 4
当然,我们的 RxJava 2.x 也为我们保留了简化订阅方法,我们可以根据需求,进行相应的简化订阅,只不过传入对象改为了 Consumer
。
Consumer
即消费者,用于接收单个值,BiConsumer
则是接收两个值,Function
用于变换对象,Predicate
用于判断。这些接口命名大多参照了 Java 8 ,熟悉 Java 8 新特性的应该都知道意思,这里也不再赘述。
线程调度
关于线程切换这点,RxJava 1.x 和 RxJava 2.x 的实现思路是一样的。这里简单的说一下,以便于我们的新司机入手。
subScribeOn
同 RxJava 1.x 一样,subscribeOn
用于指定 subscribe()
时所发生的线程,从源码角度可以看出,内部线程调度是通过 ObservableSubscribeOn
来实现的。
ObservableSubscribeOn
的核心源码在 subscribeActual
方法中,通过代理的方式使用 SubscribeOnObserver
包装 Observer
后,设置 Disposable
来将 subscribe
切换到 Scheduler
线程中。
observeOn
observeOn
方法用于指定下游 Observer
回调发生的线程。
线程切换需要注意的
RxJava 内置的线程调度器的确可以让我们的线程切换得心应手,但其中也有些需要注意的地方。
简单地说,
subscribeOn()
指定的就是发射事件的线程,observerOn
指定的就是订阅者接收事件的线程。多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用
subscribeOn()
只有第一次的有效,其余的会被忽略。但多次指定订阅者接收线程是可以的,也就是说每调用一次
observerOn()
,下游的线程就会切换一次。
Observable.create(new ObservableOnSubscribe<Integer>() { @Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName());
e.onNext(1);
e.onComplete();
}
}).subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Integer>() { @Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "After observeOn(mainThread),Current thread is " + Thread.currentThread().getName());
}
})
.observeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() { @Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "After observeOn(io),Current thread is " + Thread.currentThread().getName());
}
});
输出:
07-03 14:54:01.177 15121-15438/com.nanchen.rxjava2examples E/RxThreadActivity: Observable thread is : RxNewThreadScheduler-107-03 14:54:01.178 15121-15121/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(mainThread),Current thread is main
07-03 14:54:01.179 15121-15439/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(io),Current thread is RxCachedThreadScheduler-2
实例代码中,分别用 Schedulers.newThread()
和 Schedulers.io()
对发射线程进行切换,并采用 observeOn(AndroidSchedulers.mainThread()
和 Schedulers.io()
进行了接收线程的切换。可以看到输出中发射线程仅仅响应了第一个 newThread
,但每调用一次 observeOn()
,线程便会切换一次,因此如果我们有类似的需求时,便知道如何处理了。
RxJava 中,已经内置了很多线程选项供我们选择,例如有:
Schedulers.io()
代表io操作的线程, 通常用于网络,读写文件等io密集型的操作;Schedulers.computation()
代表CPU计算密集型的操作, 例如需要大量计算的操作;Schedulers.newThread()
代表一个常规的新线程;AndroidSchedulers.mainThread()
代表Android的主线程
这些内置的 Scheduler
已经足够满足我们开发的需求,因此我们应该使用内置的这些选项,而 RxJava 内部使用的是线程池来维护这些线程,所以效率也比较高。
操作符
关于操作符,在官方文档中已经做了非常完善的讲解,并且笔者前面的系列教程中也着重讲解了绝大多数的操作符作用,这里受于篇幅限制,就不多做赘述,只挑选几个进行实际情景的讲解。
map
map
操作符可以将一个 Observable
对象通过某种关系转换为另一个Observable
对象。在 2.x 中和 1.x 中作用几乎一致,不同点在于:2.x 将 1.x 中的 Func1
和 Func2
改为了 Function
和 BiFunction
。
采用 map 操作符进行网络数据解析
想必大家都知道,很多时候我们在使用 RxJava 的时候总是和 Retrofit 进行结合使用,而为了方便演示,这里我们就暂且采用 OkHttp3 进行演示,配合 map
,doOnNext
,线程切换进行简单的网络请求:
1)通过 Observable.create() 方法,调用 OkHttp 网络请求;
2)通过 map 操作符集合 gson,将 Response 转换为 bean 类;
3)通过 doOnNext() 方法,解析 bean 中的数据,并进行数据库存储等操作;
4)调度线程,在子线程中进行耗时操作任务,在主线程中更新 UI ;
5)通过 subscribe(),根据请求成功或者失败来更新 UI 。
Observable.create(new ObservableOnSubscribe<Response>() { @Override
public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
Builder builder = new Builder()
.url("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
.get();
Request request = builder.build();
Call call = new OkHttpClient().newCall(request);
Response response = call.execute();
e.onNext(response);
}
}).map(new Function<Response, MobileAddress>() { @Override
public MobileAddress apply(@NonNull Response response) throws Exception { if (response.isSuccessful()) {
ResponseBody body = response.body(); if (body != null) {
Log.e(TAG, "map:转换前:" + response.body()); return new Gson().fromJson(body.string(), MobileAddress.class);
}
} return null;
}
}).observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<MobileAddress>() { @Override
public void accept(@NonNull MobileAddress s) throws Exception {
Log.e(TAG, "doOnNext: 保存成功:" + s.toString() + "\n");
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<MobileAddress>() { @Override
public void accept(@NonNull MobileAddress data) throws Exception {
Log.e(TAG, "成功:" + data.toString() + "\n");
}, new Consumer<Throwable>() { @Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "失败:" + throwable.getMessage() + "\n");
}
});
concat
concat
可以做到不交错的发射两个甚至多个 Observable 的发射事件,并且只有前一个 Observable
终止(onComplete
) 后才会订阅下一个 Observable
。
采用 concat 操作符先读取缓存再通过网络请求获取数据
想必在实际应用中,很多时候(对数据操作不敏感时)都需要我们先读取缓存的数据,如果缓存没有数据,再通过网络请求获取,随后在主线程更新我们的UI。
concat
操作符简直就是为我们这种需求量身定做。
利用 concat
的必须调用 onComplete
后才能订阅下一个 Observable
的特性,我们就可以先读取缓存数据,倘若获取到的缓存数据不是我们想要的,再调用 onComplete()
以执行获取网络数据的 Observable
,如果缓存数据能应我们所需,则直接调用 onNext()
,防止过度的网络请求,浪费用户的流量。
Observable<FoodList> cache = Observable.create(new ObservableOnSubscribe<FoodList>() { @Override
public void subscribe(@NonNull ObservableEmitter<FoodList> e) throws Exception {
Log.e(TAG, "create当前线程:"+Thread.currentThread().getName() );
FoodList data = CacheManager.getInstance().getFoodListData(); // 在操作符 concat 中,只有调用 onComplete 之后才会执行下一个 Observable
if (data != null){ // 如果缓存数据不为空,则直接读取缓存数据,而不读取网络数据
isFromNet = false;
Log.e(TAG, "\nsubscribe: 读取缓存数据:" );
runOnUiThread(new Runnable() { @Override
public void run() {
mRxOperatorsText.append("\nsubscribe: 读取缓存数据:\n");
}
});
e.onNext(data);
}else {
isFromNet = true;
runOnUiThread(new Runnable() { @Override
public void run() {
mRxOperatorsText.append("\nsubscribe: 读取网络数据:\n");
}
});
Log.e(TAG, "\nsubscribe: 读取网络数据:" );
e.onComplete();
}
}
});
Observable<FoodList> network = Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
.addQueryParameter("rows",10+"")
.build()
.getObjectObservable(FoodList.class); // 两个 Observable 的泛型应当保持一致
Observable.concat(cache,network)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<FoodList>() { @Override
public void accept(@NonNull FoodList tngouBeen) throws Exception {
Log.e(TAG, "subscribe 成功:"+Thread.currentThread().getName() ); if (isFromNet){
mRxOperatorsText.append("accept : 网络获取数据设置缓存: \n");
Log.e(TAG, "accept : 网络获取数据设置缓存: \n"+tngouBeen.toString() );
CacheManager.getInstance().setFoodListData(tngouBeen);
}
mRxOperatorsText.append("accept: 读取数据成功:" + tngouBeen.toString()+"\n");
Log.e(TAG, "accept: 读取数据成功:" + tngouBeen.toString());
}
}, new Consumer<Throwable>() { @Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "subscribe 失败:"+Thread.currentThread().getName() );
Log.e(TAG, "accept: 读取数据失败:"+throwable.getMessage() );
mRxOperatorsText.append("accept: 读取数据失败:"+throwable.getMessage()+"\n");
}
});
有时候我们的缓存可能还会分为 memory 和 disk ,实际上都差不多,无非是多写点 Observable
,然后通过 concat
合并即可。
flatMap 实现多个网络请求依次依赖
想必这种情况也在实际情况中比比皆是,例如用户注册成功后需要自动登录,我们只需要先通过注册接口注册用户信息,注册成功后马上调用登录接口进行自动登录即可。
我们的 flatMap
恰好解决了这种应用场景,flatMap
操作符可以将一个发射数据的 Observable
变换为多个 Observables
,然后将它们发射的数据合并后放到一个单独的 Observable
,利用这个特性,我们很轻松地达到了我们的需求。
Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
.addQueryParameter("rows", 1 + "")
.build()
.getObjectObservable(FoodList.class) // 发起获取食品列表的请求,并解析到FootList
.subscribeOn(Schedulers.io()) // 在io线程进行网络请求
.observeOn(AndroidSchedulers.mainThread()) // 在主线程处理获取食品列表的请求结果
.doOnNext(new Consumer<FoodList>() { @Override
public void accept(@NonNull FoodList foodList) throws Exception { // 先根据获取食品列表的响应结果做一些操作
Log.e(TAG, "accept: doOnNext :" + foodList.toString());
mRxOperatorsText.append("accept: doOnNext :" + foodList.toString()+"\n");
}
})
.observeOn(Schedulers.io()) // 回到 io 线程去处理获取食品详情的请求
.flatMap(new Function<FoodList, ObservableSource<FoodDetail>>() { @Override
public ObservableSource<FoodDetail> apply(@NonNull FoodList foodList) throws Exception { if (foodList != null && foodList.getTngou() != null && foodList.getTngou().size() > 0) { return Rx2AndroidNetworking.post("http://www.tngou.net/api/food/show")
.addBodyParameter("id", foodList.getTngou().get(0).getId() + "")
.build()
.getObjectObservable(FoodDetail.class);
} return null;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<FoodDetail>() { @Override
public void accept(@NonNull FoodDetail foodDetail) throws Exception {
Log.e(TAG, "accept: success :" + foodDetail.toString());
mRxOperatorsText.append("accept: success :" + foodDetail.toString()+"\n");
}
}, new Consumer<Throwable>() { @Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "accept: error :" + throwable.getMessage());
mRxOperatorsText.append("accept: error :" + throwable.getMessage()+"\n");
}
});
善用 zip 操作符,实现多个接口数据共同更新 UI
在实际应用中,我们极有可能会在一个页面显示的数据来源于多个接口,这时候我们的 zip
操作符为我们排忧解难。
zip
操作符可以将多个 Observable
的数据结合为一个数据源再发射出去。
Observable<MobileAddress> observable1 = Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
.build()
.getObjectObservable(MobileAddress.class);
Observable<CategoryResult> observable2 = Network.getGankApi()
.getCategoryData("Android",1,1);
Observable.zip(observable1, observable2, new BiFunction<MobileAddress, CategoryResult, String>() { @Override
public String apply(@NonNull MobileAddress mobileAddress, @NonNull CategoryResult categoryResult) throws Exception { return "合并后的数据为:手机归属地:"+mobileAddress.getResult().getMobilearea()+"人名:"+categoryResult.results.get(0).who;
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() { @Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "accept: 成功:" + s+"\n");
}
}, new Consumer<Throwable>() { @Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "accept: 失败:" + throwable+"\n");
}
});
采用 interval 操作符实现心跳间隔任务
想必即时通讯等需要轮训的任务在如今的 APP 中已是很常见,而 RxJava 2.x 的 interval
操作符可谓完美地解决了我们的疑惑。
这里就简单的意思一下轮训。
private Disposable mDisposable; @Override
protected void doSomething() {
mDisposable = Flowable.interval(1, TimeUnit.SECONDS)
.doOnNext(new Consumer<Long>() { @Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept: doOnNext : "+aLong );
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() { @Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept: 设置文本 :"+aLong );
mRxOperatorsText.append("accept: 设置文本 :"+aLong +"\n");
}
});
} /**
* 销毁时停止心跳
*/
@Override
protected void onDestroy() { super.onDestroy(); if (mDisposable != null){
mDisposable.dispose();
}
}
RxJava 1.x 如何平滑升级到 RxJava 2.x?
由于 RxJava 2.x 变化较大无法直接升级,幸运的是,官方为我们提供了 RxJava2Interrop) 这个库,可以方便地把 RxJava 1.x 升级到 RxJava 2.x,或者将 RxJava 2.x 转回到 RxJava 1.x。
写在最后
本酱看你都看到这儿了,实为未来的栋梁之才,所以且送你一本经书:
https://github.com/nanchen2251/RxJava2Examples